#!/usr/bin/perl
use strict;
use MIME::Base64;
use threads;
use Thread::Queue;
use Getopt::Long qw(:config bundling no_ignore_case no_autoabbrev passthrough);
use IO::Handle qw();
use Fcntl qw(:flock);
use POSIX;
use Time::HiRes qw(usleep nanosleep);

my ($DATABASE_NAME,$PORT,$FROM_FILE,$OBJECT_BIT,$BATCH_SIZE,$IS_HELP,$VERSION);
my %BATCH_REGION_HASH = ("Language" => "","Schema" => "","Type" => "","Function" => "","Sequence" => "","Table" => "","ExtTable" => "","View" => "","Index" => "","Comment" => "");
my $REGION_TYPE;
my %OBJECT_BIT_HASH = ("ResourceQueue" => 8192,"Role" => 4096,"TableSpace" => 2048,"RoleSetting" => 1024,"Language" => 512, "Schema" => 256, "Type" => 128, "Function" => 64,
    "Sequence" => 32,"Table" => 16,"ExtTable" => 8,"View" => 4,"Index" => 2,"Comment" => 1);
my ($DEFAULT_BATCH,$SQL_BATCH_SIZE) = (1,1);
my %OBJECT_BATCH_HASH = ("ResourceQueue" => 16,"Role" => 16,"TableSpace" => 16,"RoleSetting" => 16,"Language" => 4, "Schema" => 4, "Type" => 4, "Function" => 16,
    "Sequence" => 16,"Table" => 64,"ExtTable" => 64,"View" => 64,"Index" => 1,"Comment" => 128);

my ($LOCK_FILE_HANDLE);
my ($FILE_HANDLE,$TASK_QUEUE,$MSGE_QUEUE);
my (@TASK_THREAD,$MSGE_THREAD);
my ($BATCH_MAX,$BATCH_DEFAULT,$BATCH_MIN,$CURR_BATCH_SIZE) = (128,32,8);

my ($CMD_SPLIT,$SQL_DELIM,$RECORD_SPLIT) = (chr(1).chr(0).chr(2).chr(7),chr(3).chr(4).chr(8),chr(5).chr(6).chr(9).chr(10));
my $REGION_START  = qq#--REGION-START-#.$CMD_SPLIT;
my $REGION_END    = qq#--REGION-END---#.$CMD_SPLIT;
my $TASK_SPLIT    = qq#--TASK---SPLIT-#.$CMD_SPLIT;

(my $CMD_NAME = $0) =~ s!.*/(.*)!$1!;
my $MAIN_PID = substr("000000".$$,-6);

my $HELP_MESSAGE = qq#COMMAND NAME: $CMD_NAME
Developed by Miao Chen

Work Email:
michen\@pivotal.io
Private Email:
miaochen\@mail.ustc.edu.cn
************************************************************************************************
SYNOPSIS
************************************************************************************************
$CMD_NAME [--database database] [--port port] [-f filename] [--object-bit hex string]
    [-B batch_size] [-h|--help] [--version]
*****************************************************
OPTIONS
*****************************************************

--database <Database name>

  Restore ddl to this database.
  If not specified, use \$PGDATABSE

--port <master port>

  Database port number, If not specified, use \$PGPORT or the default is 5432.
  eg.
  --port 5433

-f <filename>

  Input ddl from this file.

--object-bit <hex string>

  Match which object types will be restored, the default is FFFF.
  FFFF  means all objects
  2000  means ResourceQueue
  1000  means Role
  0800  means TableSpace
  0400  means RoleSetting
  0200  means Language
  0100  means Schema
  0080  means Type
  0040  means Function
  0020  means Sequence
  0010  means Table
  0008  means ExtTable
  0004  means View
  0002  means Index
  0001  means Comment

  example:
  --object-bit FFFF  means all global objects and in database objects
  --object-bit FC00  means all global objects
  --object-bit 03FF  means all in database objects
  --object-bit 03FD  means all in database objects exclude index
  --object-bit 03FC  means all in database objects exclude index and comment
  --object-bit 0002  means in database only index   objects
  --object-bit 0001  means in database only comment objects

-B <batch_size>

  Sets the maximum number of querys that $CMD_NAME concurrently restore to database.
  If not specified, the default is $BATCH_DEFAULT.
  The max is $BATCH_MAX.
  The min is $BATCH_MIN.

-h|--help

  Displays the online help.

--version

  Displays the command version.

example:
$CMD_NAME --database postgres --port 5432 -f postgres.ddl
$CMD_NAME -h | --help
#;

sub printMessage{
    my ($flag,$message) = @_;
    my $time_flag = strftime("%Y%m%d:%H:%M:%S.",localtime).$MAIN_PID;
    $message = "$time_flag-[$flag]-:$message\n";
    if("ERROR" eq $flag){
        print STDERR $message;
    }else{
        print STDOUT $message;
    }
}
sub exitMain{
    my ($code) = @_;
    exit $code;
}
sub errorMessage{
    my ($message) = @_;
    printMessage("ERROR",$message);
    print "Usage: $CMD_NAME [-h|--help] [options]\n";
    exitMain(1);
}
sub trim{
    my ($string) = @_;
    $string =~ s/(^\s+|\s+$)//g;
    return $string;
}
sub output{
    print trim($_[0])."\n";
}
sub exequteQuery{
    my ($query_sql,$thread_index) = @_;
    my $script_file = "/tmp/.gpdbrestore.script.".$thread_index;
    if(!open(FILE_HANDLE,">",$script_file)){
        errorMessage("Can't open temp file: $script_file");
    }
    FILE_HANDLE->autoflush(1);
    print FILE_HANDLE $query_sql;
    close FILE_HANDLE;
    my $CMDS = "set -o pipefail;PGDATABASE=$DATABASE_NAME PGPORT=$PORT PGOPTIONS='-c statement_timeout=0 -c client_encoding=UTF8 -c standard_conforming_strings=ON";
    $CMDS = $CMDS." -c check_function_bodies=OFF -c client_min_messages=WARNING -c default_with_oids=OFF' ";
    local $/ = $RECORD_SPLIT;
    $CMDS = $CMDS."psql -R '$/' -tAXF '$SQL_DELIM' 2>&1 -f $script_file |sed -e 's/psql.*gpdbrestore.script.[0-9]*:[0-9]*: //'";
    my @result = readpipe($CMDS);
    my $return_code = $? >> 8;
    chomp(@result);
    local $/ = chr(10);
    return join("\n",@result);
}
sub getOption{
    GetOptions(
        'database:s'       => \$DATABASE_NAME,
        'port:i'           => \$PORT,
        'f:s'              => \$FROM_FILE,
        'object-bit:s'     => \$OBJECT_BIT,
        'B:i'              => \$BATCH_SIZE,
        'h|help!'          => \$IS_HELP,
        'version!'         => \$VERSION,
    );
    if(@ARGV != 0){
        errorMessage("Some parameters unknown: [@ARGV]\nPlease refer to $CMD_NAME --help");
    }
    if($IS_HELP){
        print $HELP_MESSAGE;
        exitMain(0);
    }
    if($VERSION){
        print "$CMD_NAME 1.0\n";
        exitMain(0);
    }
}
sub checkOption{
    if("" eq $DATABASE_NAME){
        $DATABASE_NAME = trim($ENV{'PGDATABASE'});
    }
    if("" eq $DATABASE_NAME){
        errorMessage("Please specify parameter: --database");
    }
    if("" eq $PORT){
        $PORT = trim($ENV{'PGPORT'});
    }
    if("" eq $PORT){
        errorMessage("Please specify parameter: --port");
    }
    if("" eq $BATCH_SIZE || $BATCH_SIZE > $BATCH_MAX || $BATCH_SIZE < $BATCH_MIN){
        printMessage("NOTICE","Not specify or out of limit, use default($BATCH_DEFAULT): -B");
        $BATCH_SIZE = $BATCH_DEFAULT;
    }
    if(!-e $FROM_FILE){
        errorMessage("No file exists named: $FROM_FILE");
    }
    $OBJECT_BIT = hex($OBJECT_BIT);
    if($OBJECT_BIT < 1){
        $OBJECT_BIT = hex("FFFF");
    }
}
sub checkConflictProcess{
    my $lock_file = "/tmp/.gpddlrestore.lock";
    if(!open($LOCK_FILE_HANDLE,">",$lock_file)){
        errorMessage("Can't open lock file: $lock_file");
    }
    my $lockCode = flock($LOCK_FILE_HANDLE, LOCK_EX | LOCK_NB);
    if(!$lockCode){
        errorMessage("Lock file is in using, you can try again later");
    }
}
sub executeRestore{
    $SIG{'KILL'} = sub{threads->exit;};
    my ($index) = @_;
    my $end = 0;
    while(1){
        my ($task_batch,$task,$tasks) = ("","",0);
        while(1){
            $task = $TASK_QUEUE->dequeue();
            nanosleep(10000);
            if("" eq $task){
                $end = 1;
                last;
            }else{
                $task_batch = $task_batch."\n".$task;
                $tasks += 1;
                if($REGION_TYPE eq "Table"){
                    $tasks += () = ($task =~ /PARTITION/g);
                }
                if($tasks > $SQL_BATCH_SIZE){
                    last;
                }
            }
        }
        if("" ne $task_batch){
            my $msg = exequteQuery($task_batch,$index);
            $MSGE_QUEUE->enqueue($msg);
        }
        if($end){
            last;
        }
    }
    $MSGE_QUEUE->enqueue(undef);
}
sub executeMessage{
    $SIG{'KILL'} = sub{threads->exit;};
    my ($batch_size) = @_;
    my ($expect_end) = (0);
    my $msg = $MSGE_QUEUE->dequeue();
    while(1){
        if(defined $msg){
            output($msg);
        }else{
            $expect_end += 1;
            if($expect_end eq $CURR_BATCH_SIZE){
                last;
            }
        }
        $msg = $MSGE_QUEUE->dequeue();
    }
}
sub restoreRegion{
    my $object_bit = $OBJECT_BIT_HASH{$REGION_TYPE};
    if("" eq $object_bit){
        return;
    }
    unless($object_bit & $OBJECT_BIT){
        return;
    }
    my @task_list = @_;
    $TASK_QUEUE = Thread::Queue->new();
    $MSGE_QUEUE = Thread::Queue->new();
    for my $task(@task_list){
        $TASK_QUEUE->enqueue($task);
    }
    @TASK_THREAD = ();
    for my $index(0 .. $CURR_BATCH_SIZE - 1){
        $TASK_QUEUE->enqueue(undef);
        my $task_thread = threads->new(\&executeRestore,$index);
        push @TASK_THREAD,$task_thread;
    }
    $MSGE_THREAD = threads->new(\&executeMessage,$CURR_BATCH_SIZE);
    $MSGE_THREAD->join;
    for my $thread(@TASK_THREAD){
        $thread->join;
    }
}
sub restoreFromFile{
    if(!open($FILE_HANDLE,"<",$FROM_FILE)){
        errorMessage("Can't open file: $FROM_FILE");
    }
    my @task_list = ();
    my @line_list = ();
    while(my $line = <$FILE_HANDLE>){
        if($line =~ /^$REGION_START/){
            $REGION_TYPE = trim($line);
            $REGION_TYPE =~ s/$REGION_START//;
            output(qq{Start to restore $REGION_TYPE}.("." x 16));
        }elsif($line =~ /^$REGION_END/){
            push @task_list,join("",@line_list);
            @line_list = ();
            if(exists $OBJECT_BATCH_HASH{$REGION_TYPE}){
                $SQL_BATCH_SIZE = $OBJECT_BATCH_HASH{$REGION_TYPE};
            }else{
                $SQL_BATCH_SIZE = $DEFAULT_BATCH;
            }
            if(exists $BATCH_REGION_HASH{$REGION_TYPE}){
                $CURR_BATCH_SIZE = $BATCH_SIZE;
            }else{
                $CURR_BATCH_SIZE = 1;
            }
            restoreRegion(@task_list);
            @task_list = ();
        }elsif($line =~ /^$TASK_SPLIT/){
            push @task_list,join("",@line_list);
            @line_list = ();
        }else{
            push @line_list,$line;
        }
    }
    close $FILE_HANDLE;
    return @line_list;
}
sub clearTempFile{
    my $lock_file = "/tmp/.gpddlrestore.lock";
    if($LOCK_FILE_HANDLE){
        close $LOCK_FILE_HANDLE;
    }
    unlink $lock_file;
    system(qq{rm -f /tmp/.gpdbrestore.script.*});
}
sub main{
    getOption();
    checkOption();
    checkConflictProcess();
    restoreFromFile();
    clearTempFile();
}
my $command_string = $0." ".join(" ",@ARGV);
STDOUT->autoflush(1);
STDERR->autoflush(1);
main($command_string);
